# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS.  All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""Test data generators producing signals pairs intended to be used to
test the APM module. Each pair consists of a noisy input and a reference signal.
The former is used as APM input and it is generated by adding noise to a
clean audio track. The reference is the expected APM output.

Throughout this file, the following naming convention is used:
  - input signal: the clean signal (e.g., speech),
  - noise signal: the noise to be summed up to the input signal (e.g., white
    noise, Gaussian noise),
  - noisy signal: input + noise.
The noise signal may or may not be a function of the clean signal. For
instance, white noise is independently generated, whereas reverberation is
obtained by convolving the input signal with an impulse response.
"""

import logging
import os
import shutil
import sys

try:
    import scipy.io
except ImportError:
    logging.critical('Cannot import the third-party Python package scipy')
    sys.exit(1)

from . import data_access
from . import exceptions
from . import signal_processing


class TestDataGenerator(object):
    """Abstract class responsible for the generation of noisy signals.

  Given a clean signal, it generates two streams named noisy signal and
  reference. The former is the clean signal deteriorated by the noise source,
  the latter goes through the same deterioration process, but more "gently".
  Noisy signal and reference are produced so that the reference is the signal
  expected at the output of the APM module when the latter is fed with the noisy
  signal.

  An test data generator generates one or more pairs.
  """

    NAME = None
    REGISTERED_CLASSES = {}

    def __init__(self, output_directory_prefix):
        self._output_directory_prefix = output_directory_prefix
        # Init dictionaries with one entry for each test data generator
        # configuration (e.g., different SNRs).
        # Noisy audio track files (stored separately in a cache folder).
        self._noisy_signal_filepaths = None
        # Path to be used for the APM simulation output files.
        self._apm_output_paths = None
        # Reference audio track files (stored separately in a cache folder).
        self._reference_signal_filepaths = None
        self.Clear()

    @classmethod
    def RegisterClass(cls, class_to_register):
        """Registers a TestDataGenerator implementation.

    Decorator to automatically register the classes that extend
    TestDataGenerator.
    Example usage:

    @TestDataGenerator.RegisterClass
    class IdentityGenerator(TestDataGenerator):
      pass
    """
        cls.REGISTERED_CLASSES[class_to_register.NAME] = class_to_register
        return class_to_register

    @property
    def config_names(self):
        return self._noisy_signal_filepaths.keys()

    @property
    def noisy_signal_filepaths(self):
        return self._noisy_signal_filepaths

    @property
    def apm_output_paths(self):
        return self._apm_output_paths

    @property
    def reference_signal_filepaths(self):
        return self._reference_signal_filepaths

    def Generate(self, input_signal_filepath, test_data_cache_path,
                 base_output_path):
        """Generates a set of noisy input and reference audiotrack file pairs.

    This method initializes an empty set of pairs and calls the _Generate()
    method implemented in a concrete class.

    Args:
      input_signal_filepath: path to the clean input audio track file.
      test_data_cache_path: path to the cache of the generated audio track
                            files.
      base_output_path: base path where output is written.
    """
        self.Clear()
        self._Generate(input_signal_filepath, test_data_cache_path,
                       base_output_path)

    def Clear(self):
        """Clears the generated output path dictionaries.
    """
        self._noisy_signal_filepaths = {}
        self._apm_output_paths = {}
        self._reference_signal_filepaths = {}

    def _Generate(self, input_signal_filepath, test_data_cache_path,
                  base_output_path):
        """Abstract method to be implemented in each concrete class.
    """
        raise NotImplementedError()

    def _AddNoiseSnrPairs(self, base_output_path, noisy_mix_filepaths,
                          snr_value_pairs):
        """Adds noisy-reference signal pairs.

    Args:
      base_output_path: noisy tracks base output path.
      noisy_mix_filepaths: nested dictionary of noisy signal paths organized
                           by noisy track name and SNR level.
      snr_value_pairs: list of SNR pairs.
    """
        for noise_track_name in noisy_mix_filepaths:
            for snr_noisy, snr_refence in snr_value_pairs:
                config_name = '{0}_{1:d}_{2:d}_SNR'.format(
                    noise_track_name, snr_noisy, snr_refence)
                output_path = self._MakeDir(base_output_path, config_name)
                self._AddNoiseReferenceFilesPair(
                    config_name=config_name,
                    noisy_signal_filepath=noisy_mix_filepaths[noise_track_name]
                    [snr_noisy],
                    reference_signal_filepath=noisy_mix_filepaths[
                        noise_track_name][snr_refence],
                    output_path=output_path)

    def _AddNoiseReferenceFilesPair(self, config_name, noisy_signal_filepath,
                                    reference_signal_filepath, output_path):
        """Adds one noisy-reference signal pair.

    Args:
      config_name: name of the APM configuration.
      noisy_signal_filepath: path to noisy audio track file.
      reference_signal_filepath: path to reference audio track file.
      output_path: APM output path.
    """
        assert config_name not in self._noisy_signal_filepaths
        self._noisy_signal_filepaths[config_name] = os.path.abspath(
            noisy_signal_filepath)
        self._apm_output_paths[config_name] = os.path.abspath(output_path)
        self._reference_signal_filepaths[config_name] = os.path.abspath(
            reference_signal_filepath)

    def _MakeDir(self, base_output_path, test_data_generator_config_name):
        output_path = os.path.join(
            base_output_path,
            self._output_directory_prefix + test_data_generator_config_name)
        data_access.MakeDirectory(output_path)
        return output_path


@TestDataGenerator.RegisterClass
class IdentityTestDataGenerator(TestDataGenerator):
    """Generator that adds no noise.

  Both the noisy and the reference signals are the input signal.
  """

    NAME = 'identity'

    def __init__(self, output_directory_prefix, copy_with_identity):
        TestDataGenerator.__init__(self, output_directory_prefix)
        self._copy_with_identity = copy_with_identity

    @property
    def copy_with_identity(self):
        return self._copy_with_identity

    def _Generate(self, input_signal_filepath, test_data_cache_path,
                  base_output_path):
        config_name = 'default'
        output_path = self._MakeDir(base_output_path, config_name)

        if self._copy_with_identity:
            input_signal_filepath_new = os.path.join(
                test_data_cache_path,
                os.path.split(input_signal_filepath)[1])
            logging.info('copying ' + input_signal_filepath + ' to ' +
                         (input_signal_filepath_new))
            shutil.copy(input_signal_filepath, input_signal_filepath_new)
            input_signal_filepath = input_signal_filepath_new

        self._AddNoiseReferenceFilesPair(
            config_name=config_name,
            noisy_signal_filepath=input_signal_filepath,
            reference_signal_filepath=input_signal_filepath,
            output_path=output_path)


@TestDataGenerator.RegisterClass
class WhiteNoiseTestDataGenerator(TestDataGenerator):
    """Generator that adds white noise.
  """

    NAME = 'white_noise'

    # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
    # The reference (second value of each pair) always has a lower amount of noise
    # - i.e., the SNR is 10 dB higher.
    _SNR_VALUE_PAIRS = [
        [20, 30],  # Smallest noise.
        [10, 20],
        [5, 15],
        [0, 10],  # Largest noise.
    ]

    _NOISY_SIGNAL_FILENAME_TEMPLATE = 'noise_{0:d}_SNR.wav'

    def __init__(self, output_directory_prefix):
        TestDataGenerator.__init__(self, output_directory_prefix)

    def _Generate(self, input_signal_filepath, test_data_cache_path,
                  base_output_path):
        # Load the input signal.
        input_signal = signal_processing.SignalProcessingUtils.LoadWav(
            input_signal_filepath)

        # Create the noise track.
        noise_signal = signal_processing.SignalProcessingUtils.GenerateWhiteNoise(
            input_signal)

        # Create the noisy mixes (once for each unique SNR value).
        noisy_mix_filepaths = {}
        snr_values = set(
            [snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
        for snr in snr_values:
            noisy_signal_filepath = os.path.join(
                test_data_cache_path,
                self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(snr))

            # Create and save if not done.
            if not os.path.exists(noisy_signal_filepath):
                # Create noisy signal.
                noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
                    input_signal, noise_signal, snr)

                # Save.
                signal_processing.SignalProcessingUtils.SaveWav(
                    noisy_signal_filepath, noisy_signal)

            # Add file to the collection of mixes.
            noisy_mix_filepaths[snr] = noisy_signal_filepath

        # Add all the noisy-reference signal pairs.
        for snr_noisy, snr_refence in self._SNR_VALUE_PAIRS:
            config_name = '{0:d}_{1:d}_SNR'.format(snr_noisy, snr_refence)
            output_path = self._MakeDir(base_output_path, config_name)
            self._AddNoiseReferenceFilesPair(
                config_name=config_name,
                noisy_signal_filepath=noisy_mix_filepaths[snr_noisy],
                reference_signal_filepath=noisy_mix_filepaths[snr_refence],
                output_path=output_path)


# TODO(alessiob): remove comment when class implemented.
# @TestDataGenerator.RegisterClass
class NarrowBandNoiseTestDataGenerator(TestDataGenerator):
    """Generator that adds narrow-band noise.
  """

    NAME = 'narrow_band_noise'

    def __init__(self, output_directory_prefix):
        TestDataGenerator.__init__(self, output_directory_prefix)

    def _Generate(self, input_signal_filepath, test_data_cache_path,
                  base_output_path):
        # TODO(alessiob): implement.
        pass


@TestDataGenerator.RegisterClass
class AdditiveNoiseTestDataGenerator(TestDataGenerator):
    """Generator that adds noise loops.

  This generator uses all the wav files in a given path (default: noise_tracks/)
  and mixes them to the clean speech with different target SNRs (hard-coded).
  """

    NAME = 'additive_noise'
    _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav'

    DEFAULT_NOISE_TRACKS_PATH = os.path.join(os.path.dirname(__file__),
                                             os.pardir, 'noise_tracks')

    # TODO(alessiob): Make the list of SNR pairs customizable.
    # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
    # The reference (second value of each pair) always has a lower amount of noise
    # - i.e., the SNR is 10 dB higher.
    _SNR_VALUE_PAIRS = [
        [20, 30],  # Smallest noise.
        [10, 20],
        [5, 15],
        [0, 10],  # Largest noise.
    ]

    def __init__(self, output_directory_prefix, noise_tracks_path):
        TestDataGenerator.__init__(self, output_directory_prefix)
        self._noise_tracks_path = noise_tracks_path
        self._noise_tracks_file_names = [
            n for n in os.listdir(self._noise_tracks_path)
            if n.lower().endswith('.wav')
        ]
        if len(self._noise_tracks_file_names) == 0:
            raise exceptions.InitializationException(
                'No wav files found in the noise tracks path %s' %
                (self._noise_tracks_path))

    def _Generate(self, input_signal_filepath, test_data_cache_path,
                  base_output_path):
        """Generates test data pairs using environmental noise.

    For each noise track and pair of SNR values, the following two audio tracks
    are created: the noisy signal and the reference signal. The former is
    obtained by mixing the (clean) input signal to the corresponding noise
    track enforcing the target SNR.
    """
        # Init.
        snr_values = set(
            [snr for pair in self._SNR_VALUE_PAIRS for snr in pair])

        # Load the input signal.
        input_signal = signal_processing.SignalProcessingUtils.LoadWav(
            input_signal_filepath)

        noisy_mix_filepaths = {}
        for noise_track_filename in self._noise_tracks_file_names:
            # Load the noise track.
            noise_track_name, _ = os.path.splitext(noise_track_filename)
            noise_track_filepath = os.path.join(self._noise_tracks_path,
                                                noise_track_filename)
            if not os.path.exists(noise_track_filepath):
                logging.error('cannot find the <%s> noise track',
                              noise_track_filename)
                raise exceptions.FileNotFoundError()

            noise_signal = signal_processing.SignalProcessingUtils.LoadWav(
                noise_track_filepath)

            # Create the noisy mixes (once for each unique SNR value).
            noisy_mix_filepaths[noise_track_name] = {}
            for snr in snr_values:
                noisy_signal_filepath = os.path.join(
                    test_data_cache_path,
                    self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(
                        noise_track_name, snr))

                # Create and save if not done.
                if not os.path.exists(noisy_signal_filepath):
                    # Create noisy signal.
                    noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
                        input_signal,
                        noise_signal,
                        snr,
                        pad_noise=signal_processing.SignalProcessingUtils.
                        MixPadding.LOOP)

                    # Save.
                    signal_processing.SignalProcessingUtils.SaveWav(
                        noisy_signal_filepath, noisy_signal)

                # Add file to the collection of mixes.
                noisy_mix_filepaths[noise_track_name][
                    snr] = noisy_signal_filepath

        # Add all the noise-SNR pairs.
        self._AddNoiseSnrPairs(base_output_path, noisy_mix_filepaths,
                               self._SNR_VALUE_PAIRS)


@TestDataGenerator.RegisterClass
class ReverberationTestDataGenerator(TestDataGenerator):
    """Generator that adds reverberation noise.

  TODO(alessiob): Make this class more generic since the impulse response can be
  anything (not just reverberation); call it e.g.,
  ConvolutionalNoiseTestDataGenerator.
  """

    NAME = 'reverberation'

    _IMPULSE_RESPONSES = {
        'lecture': 'air_binaural_lecture_0_0_1.mat',  # Long echo.
        'booth': 'air_binaural_booth_0_0_1.mat',  # Short echo.
    }
    _MAX_IMPULSE_RESPONSE_LENGTH = None

    # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
    # The reference (second value of each pair) always has a lower amount of noise
    # - i.e., the SNR is 5 dB higher.
    _SNR_VALUE_PAIRS = [
        [3, 8],  # Smallest noise.
        [-3, 2],  # Largest noise.
    ]

    _NOISE_TRACK_FILENAME_TEMPLATE = '{0}.wav'
    _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav'

    def __init__(self, output_directory_prefix, aechen_ir_database_path):
        TestDataGenerator.__init__(self, output_directory_prefix)
        self._aechen_ir_database_path = aechen_ir_database_path

    def _Generate(self, input_signal_filepath, test_data_cache_path,
                  base_output_path):
        """Generates test data pairs using reverberation noise.

    For each impulse response, one noise track is created. For each impulse
    response and pair of SNR values, the following 2 audio tracks are
    created: the noisy signal and the reference signal. The former is
    obtained by mixing the (clean) input signal to the corresponding noise
    track enforcing the target SNR.
    """
        # Init.
        snr_values = set(
            [snr for pair in self._SNR_VALUE_PAIRS for snr in pair])

        # Load the input signal.
        input_signal = signal_processing.SignalProcessingUtils.LoadWav(
            input_signal_filepath)

        noisy_mix_filepaths = {}
        for impulse_response_name in self._IMPULSE_RESPONSES:
            noise_track_filename = self._NOISE_TRACK_FILENAME_TEMPLATE.format(
                impulse_response_name)
            noise_track_filepath = os.path.join(test_data_cache_path,
                                                noise_track_filename)
            noise_signal = None
            try:
                # Load noise track.
                noise_signal = signal_processing.SignalProcessingUtils.LoadWav(
                    noise_track_filepath)
            except exceptions.FileNotFoundError:
                # Generate noise track by applying the impulse response.
                impulse_response_filepath = os.path.join(
                    self._aechen_ir_database_path,
                    self._IMPULSE_RESPONSES[impulse_response_name])
                noise_signal = self._GenerateNoiseTrack(
                    noise_track_filepath, input_signal,
                    impulse_response_filepath)
            assert noise_signal is not None

            # Create the noisy mixes (once for each unique SNR value).
            noisy_mix_filepaths[impulse_response_name] = {}
            for snr in snr_values:
                noisy_signal_filepath = os.path.join(
                    test_data_cache_path,
                    self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(
                        impulse_response_name, snr))

                # Create and save if not done.
                if not os.path.exists(noisy_signal_filepath):
                    # Create noisy signal.
                    noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
                        input_signal, noise_signal, snr)

                    # Save.
                    signal_processing.SignalProcessingUtils.SaveWav(
                        noisy_signal_filepath, noisy_signal)

                # Add file to the collection of mixes.
                noisy_mix_filepaths[impulse_response_name][
                    snr] = noisy_signal_filepath

        # Add all the noise-SNR pairs.
        self._AddNoiseSnrPairs(base_output_path, noisy_mix_filepaths,
                               self._SNR_VALUE_PAIRS)

    def _GenerateNoiseTrack(self, noise_track_filepath, input_signal,
                            impulse_response_filepath):
        """Generates noise track.

    Generate a signal by convolving input_signal with the impulse response in
    impulse_response_filepath; then save to noise_track_filepath.

    Args:
      noise_track_filepath: output file path for the noise track.
      input_signal: (clean) input signal samples.
      impulse_response_filepath: impulse response file path.

    Returns:
      AudioSegment instance.
    """
        # Load impulse response.
        data = scipy.io.loadmat(impulse_response_filepath)
        impulse_response = data['h_air'].flatten()
        if self._MAX_IMPULSE_RESPONSE_LENGTH is not None:
            logging.info('truncating impulse response from %d to %d samples',
                         len(impulse_response),
                         self._MAX_IMPULSE_RESPONSE_LENGTH)
            impulse_response = impulse_response[:self.
                                                _MAX_IMPULSE_RESPONSE_LENGTH]

        # Apply impulse response.
        processed_signal = (
            signal_processing.SignalProcessingUtils.ApplyImpulseResponse(
                input_signal, impulse_response))

        # Save.
        signal_processing.SignalProcessingUtils.SaveWav(
            noise_track_filepath, processed_signal)

        return processed_signal
